{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Vowpal Wabbit and LightGBM for a Regression Problem\n",
    "\n",
    "This notebook shows how to build simple regression models by using [Vowpal Wabbit (VW)](https://github.com/VowpalWabbit/vowpal_wabbit) and [LightGBM](https://github.com/microsoft/LightGBM) with MMLSpark. We also compares the results with [Spark MLlib Linear Regression](https://spark.apache.org/docs/latest/ml-classification-regression.html#linear-regression)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import math\n",
    "from matplotlib.colors import ListedColormap, Normalize\n",
    "from matplotlib.cm import get_cmap\n",
    "import matplotlib.pyplot as plt\n",
    "from mmlspark.train import ComputeModelStatistics\n",
    "from mmlspark.vw import VowpalWabbitRegressor, VowpalWabbitFeaturizer\n",
    "from mmlspark.lightgbm import LightGBMRegressor\n",
    "import numpy as np\n",
    "import pandas as pd\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "from pyspark.ml.regression import LinearRegression\n",
    "import pyspark.sql.types as T\n",
    "from sklearn.datasets import load_boston"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prepare Dataset\n",
    "We use [*Boston house price* dataset](https://scikit-learn.org/stable/modules/generated/sklearn.datasets.load_boston.html) because of the ease of access and simplicity which is good to start with. The data was collected in 1978 from Boston area and consists of 506 entries with 14 features including the value of homes. We use `sklearn.datasets` module to download it easily, then split the set into training and testing by 75/25."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "boston = load_boston()\n",
    "\n",
    "feature_cols = ['f' + str(i) for i in range(boston.data.shape[1])]\n",
    "header = ['target'] + feature_cols\n",
    "df = spark.createDataFrame(\n",
    "    pd.DataFrame(data=np.column_stack((boston.target, boston.data)), columns=header)\n",
    ")\n",
    "display(df.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "train_data, test_data = df.randomSplit([0.75, 0.25], seed=42)\n",
    "train_data.cache()\n",
    "test_data.cache()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Following is the summary of the training set."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "display(train_data.summary().toPandas())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Plot feature distributions over different target values (house prices in our case)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "features = train_data.columns[1:]\n",
    "values = train_data.drop('target').toPandas()\n",
    "ncols = 5\n",
    "nrows = math.ceil(len(features) / ncols)\n",
    "\n",
    "yy = [r['target'] for r in train_data.select('target').collect()]\n",
    "\n",
    "f, axes = plt.subplots(nrows, ncols, sharey=True, figsize=(30,10))\n",
    "f.tight_layout()\n",
    "\n",
    "for irow in range(nrows):\n",
    "    axes[irow][0].set_ylabel('target')\n",
    "    for icol in range(ncols):\n",
    "        try:\n",
    "            feat = features[irow*ncols + icol]\n",
    "            xx = values[feat]\n",
    "\n",
    "            axes[irow][icol].scatter(xx, yy, s=10, alpha=0.25)\n",
    "            axes[irow][icol].set_xlabel(feat)\n",
    "            axes[irow][icol].get_yaxis().set_ticks([])\n",
    "        except IndexError:\n",
    "            f.delaxes(axes[irow][icol])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Baseline - Spark MLlib Linear Regressor\n",
    "\n",
    "First, we set a baseline performance by using Linear Regressor in Spark MLlib."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "featurizer = VectorAssembler(\n",
    "    inputCols=feature_cols,\n",
    "    outputCol='features'\n",
    ")\n",
    "lr_train_data = featurizer.transform(train_data)['target', 'features']\n",
    "lr_test_data = featurizer.transform(test_data)['target', 'features']\n",
    "display(lr_train_data.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# By default, `maxIter` is 100. Other params you may want to change include: `regParam`, `elasticNetParam`, etc.\n",
    "lr = LinearRegression(\n",
    "    labelCol='target',\n",
    ")\n",
    "\n",
    "lr_model = lr.fit(lr_train_data)\n",
    "lr_predictions = lr_model.transform(lr_test_data)\n",
    "\n",
    "display(lr_predictions.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We evaluate the prediction result by using `mmlspark.train.ComputeModelStatistics` which returns four metrics:\n",
    "* [MSE (Mean Squared Error)](https://en.wikipedia.org/wiki/Mean_squared_error)\n",
    "* [RMSE (Root Mean Squared Error)](https://en.wikipedia.org/wiki/Root-mean-square_deviation) = sqrt(MSE)\n",
    "* [R quared](https://en.wikipedia.org/wiki/Coefficient_of_determination)\n",
    "* [MAE (Mean Absolute Error)](https://en.wikipedia.org/wiki/Mean_absolute_error)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric='regression',\n",
    "    labelCol='target',\n",
    "    scoresCol='prediction'\n",
    ").transform(lr_predictions)\n",
    "\n",
    "results = metrics.toPandas()\n",
    "results.insert(0, 'model', ['Spark MLlib - Linear Regression'])\n",
    "display(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Vowpal Wabbit"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Perform VW-style feature hashing. Many types (numbers, string, bool, map of string to (number, string)) are supported."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "vw_featurizer = VowpalWabbitFeaturizer(\n",
    "    inputCols=feature_cols,\n",
    "    outputCol='features',\n",
    ")\n",
    "vw_train_data = vw_featurizer.transform(train_data)['target', 'features']\n",
    "vw_test_data = vw_featurizer.transform(test_data)['target', 'features']\n",
    "display(vw_train_data.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "See [VW wiki](https://github.com/vowpalWabbit/vowpal_wabbit/wiki/Command-Line-Arguments) for command line arguments."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Use the same number of iterations as Spark MLlib's Linear Regression (=100)\n",
    "args = \"--holdout_off --loss_function quantile -l 7 -q :: --power_t 0.3\"\n",
    "vwr = VowpalWabbitRegressor(\n",
    "    labelCol='target',\n",
    "    args=args,\n",
    "    numPasses=100,\n",
    ")\n",
    "\n",
    "# To reduce number of partitions (which will effect performance), use `vw_train_data.coalesce(1)`\n",
    "vw_model = vwr.fit(vw_train_data.coalesce(1))\n",
    "vw_predictions = vw_model.transform(vw_test_data)\n",
    "\n",
    "display(vw_predictions.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric='regression',\n",
    "    labelCol='target',\n",
    "    scoresCol='prediction'\n",
    ").transform(vw_predictions)\n",
    "\n",
    "vw_result = metrics.toPandas()\n",
    "vw_result.insert(0, 'model', ['Vowpal Wabbit'])\n",
    "results = results.append(\n",
    "    vw_result,\n",
    "    ignore_index=True\n",
    ")\n",
    "display(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## LightGBM"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lgr = LightGBMRegressor(\n",
    "    objective='quantile',\n",
    "    alpha=0.2,\n",
    "    learningRate=0.3,\n",
    "    numLeaves=31,\n",
    "    labelCol='target',\n",
    "    numIterations=100,\n",
    ")\n",
    "\n",
    "# Using one partition since the training dataset is very small\n",
    "lg_model = lgr.fit(lr_train_data.coalesce(1))\n",
    "lg_predictions = lg_model.transform(lr_test_data)\n",
    "\n",
    "display(lg_predictions.limit(10).toPandas())"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "metrics = ComputeModelStatistics(\n",
    "    evaluationMetric='regression',\n",
    "    labelCol='target',\n",
    "    scoresCol='prediction'\n",
    ").transform(lg_predictions)\n",
    "\n",
    "lg_result = metrics.toPandas()\n",
    "lg_result.insert(0, 'model', ['LightGBM'])\n",
    "results = results.append(\n",
    "    lg_result,\n",
    "    ignore_index=True\n",
    ")\n",
    "display(results)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Following figure shows the actual-vs.-prediction graphs of the results:\n",
    "\n",
    "<img width=\"1102\" alt=\"lr-vw-lg\" src=\"https://user-images.githubusercontent.com/42475935/64071975-4c3e9600-cc54-11e9-8b1f-9a1ee300f445.png\">"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cmap = get_cmap('YlOrRd')\n",
    "\n",
    "target = np.array(test_data.select('target').collect()).flatten()\n",
    "model_preds = [\n",
    "    (\"Spark MLlib Linear Regression\", lr_predictions),\n",
    "    (\"Vowpal Wabbit\", vw_predictions),\n",
    "    (\"LightGBM\", lg_predictions)\n",
    "]\n",
    "\n",
    "f, axes = plt.subplots(1, len(model_preds), sharey=True, figsize=(18, 6))\n",
    "f.tight_layout()\n",
    "\n",
    "for i, (model_name, preds) in enumerate(model_preds):\n",
    "    preds = np.array(preds.select('prediction').collect()).flatten()\n",
    "    err = np.absolute(preds - target)\n",
    "\n",
    "    norm = Normalize()\n",
    "    clrs = cmap(np.asarray(norm(err)))[:, :-1]\n",
    "    axes[i].scatter(preds, target, s=60, c=clrs, edgecolors='#888888', alpha=0.75)\n",
    "    axes[i].plot((0, 60), (0, 60), linestyle='--', color='#888888')\n",
    "    axes[i].set_xlabel('Predicted values')\n",
    "    if i ==0:\n",
    "        axes[i].set_ylabel('Actual values')\n",
    "    axes[i].set_title(model_name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  },
  "name": "mmlspark example - regression",
  "notebookId": 1395284431467721
 },
 "nbformat": 4,
 "nbformat_minor": 1
}
